Skip to content

Add fast raw memory search path#194

Open
cnguyen14 wants to merge 3 commits into
XortexAI:mainfrom
cnguyen14:codex/fast-search-path-163-v2
Open

Add fast raw memory search path#194
cnguyen14 wants to merge 3 commits into
XortexAI:mainfrom
cnguyen14:codex/fast-search-path-163-v2

Conversation

@cnguyen14
Copy link
Copy Markdown

Replaces #189, which was closed accidentally and could not be reopened.

Closes #163

Summary

  • expands /v1/memory/search into a low-latency raw search path across profile, temporal, summary, snippet, and code domains
  • keeps answer synthesis optional via answer=true
  • adds cached profile catalogs and cached raw retrieval plans
  • returns per-mode latency timings plus rolling p50/p95/p99 stats

Follow-up after code-assist review

  • guards latency sample writes/snapshots with a lock
  • runs raw domain searches concurrently with asyncio.gather
  • bounds profile catalog cache with TTL plus max-user eviction

Tests

  • pytest tests/api/test_dependencies_and_routes.py tests/unit/test_schemas.py -q
  • python3 -m ruff check src/api/schemas.py src/api/routes/memory.py src/pipelines/retrieval.py tests/api/test_dependencies_and_routes.py

@cnguyen14 cnguyen14 force-pushed the codex/fast-search-path-163-v2 branch from 44c885e to b7c56e7 Compare May 21, 2026 17:28
@cnguyen14
Copy link
Copy Markdown
Author

Rebased this replacement PR on the latest main and resolved the conflict from the new durable memory job routes. The branch is mergeable again.

Latest local validation after rebase:

  • pytest tests/api/test_dependencies_and_routes.py tests/unit/test_schemas.py -q -> 8 passed
  • python3 -m ruff check src/api/schemas.py src/api/routes/memory.py src/pipelines/retrieval.py tests/api/test_dependencies_and_routes.py -> passed

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request enhances the memory search endpoint by introducing concurrent search execution via a retrieval plan, adding support for synthesized answers, and expanding search domains to include snippets and code. It also implements latency tracking with percentile statistics and a TTL-based cache for profile catalogs. Feedback focuses on ensuring thread-safety for the new caches, improving type hints in helper functions, adopting more idiomatic awaitable detection using inspect.isawaitable, and refining the cache eviction strategy to follow an LRU policy.

I am having trouble creating individual review comments. Click here to see my feedback.

src/pipelines/retrieval.py (137-140)

high

The RetrievalPipeline cache dictionaries are accessed and modified in an asynchronous context but lack thread-safety guards. While asyncio tasks on a single thread are safe if they don't yield, these methods are synchronous and could be called from multiple threads if the server is configured with a thread pool. Adding a threading.Lock would prevent potential race conditions during cache pruning and updates.

src/api/routes/memory.py (143)

medium

The _timed helper function is missing type hints for its arguments, which reduces code clarity and makes it harder for static analysis tools to verify usage. Adding hints for func, args, and kwargs would improve maintainability.

async def _timed(mode: str, func: Callable, *args: Any, threaded: bool = False, **kwargs: Any):

src/api/routes/memory.py (149-150)

medium

Checking for __await__ is a bit fragile for detecting awaitables. It is more idiomatic and robust to use inspect.isawaitable from the standard library to determine if the result needs to be awaited.

        if inspect.isawaitable(result):
            result = await result

src/pipelines/retrieval.py (507)

medium

Updating the cache entry by assignment in a standard Python dictionary does not move the key to the end of the insertion order. This results in a FIFO (First-In-First-Out) eviction policy rather than LRU (Least Recently Used). For an active user cache, re-inserting the key or using collections.OrderedDict.move_to_end is preferred to keep frequently accessed items in the cache.

            self._profile_catalog_cache.pop(user_id)
            self._profile_catalog_cache[user_id] = (now, cached[1], cached[2])

@cnguyen14
Copy link
Copy Markdown
Author

Pushed follow-up commit 42b5f20 addressing the latest code-assist feedback:

  • added locks around RetrievalPipeline profile catalog and raw retrieval plan caches
  • added type hints for _timed
  • switched awaitable detection to inspect.isawaitable
  • changed profile cache hit handling to pop/reinsert so bounded eviction behaves as LRU rather than FIFO

Validation:

  • pytest tests/api/test_dependencies_and_routes.py tests/unit/test_schemas.py -q -> 8 passed
  • python3 -m ruff check src/api/schemas.py src/api/routes/memory.py src/pipelines/retrieval.py tests/api/test_dependencies_and_routes.py -> passed

@greptile-apps
Copy link
Copy Markdown

greptile-apps Bot commented May 23, 2026

Greptile Summary

This PR expands /v1/memory/search from a sequential 3-domain fetch into a concurrent 5-domain raw search path, adds optional LLM answer synthesis via answer=true, and introduces per-domain latency tracking with rolling p50/p95/p99 stats backed by process-level deques.

  • Concurrent domain search: all five domains (profile, temporal, summary, snippet, code) now run in parallel via asyncio.gather; sync domains are offloaded to threads with asyncio.to_thread.
  • Latency instrumentation: a new _timed helper records per-domain elapsed time, which feeds into SearchResponse.latency_ms and latency_stats using a thread-safe locked deque per mode.
  • Caching: _fetch_profile_catalog gains a TTL+LRU cache (60 s, max 256 users) and raw_retrieval_plan adds a locked dict cache for normalized domain tuples — but the raw _search_profile path bypasses the catalog cache entirely, so the caching benefit is not realized for profile on the raw search path.

Confidence Score: 3/5

Safe to merge for non-answer requests; when answer=true with the profile domain, every request issues two identical vector store queries — once via the concurrent raw path and again inside pipeline.run — undercutting the new cache entirely on that path.

The concurrent gather, latency tracking, and schema changes are solid. The profile double-fetch when answer=True is a real efficiency defect introduced by this PR: _search_profile goes directly to the vector store while _fetch_profile_catalog (called inside pipeline.run) holds the TTL cache. Both hit the same query on every request, meaning the cache the PR advertises never fires for the raw profile path. The plan-cache key bug and the stampede window in _fetch_profile_catalog are additional correctness-adjacent issues that compound the concern.

src/api/routes/memory.py (_search_profile bypasses cache) and src/pipelines/retrieval.py (raw_retrieval_plan key, _fetch_profile_catalog double-check)

Important Files Changed

Filename Overview
src/api/routes/memory.py Refactors search_memory to run all domain searches concurrently via asyncio.gather, adds _timed helper for per-domain latency tracking, and adds snippet/code search paths; the raw profile search bypasses the new pipeline cache (causing double DB queries when answer=True).
src/pipelines/retrieval.py Adds TTL+LRU cache for _fetch_profile_catalog and a locked plan cache via raw_retrieval_plan; the plan cache key includes answer which never changes the returned value (wasted entries), and the profile cache write lacks a double-check that risks stampede overwrites.
src/api/schemas.py Extends SearchRequest with answer flag and deduplicated domains, adds SearchLatencySummary model, and expands SearchResponse with answer/confidence/latency fields; no issues found.
tests/api/test_dependencies_and_routes.py Adds a complete FakeRetrievalPipeline with all five domain search stubs and a new integration test covering raw hits, per-domain latency, and optional answer synthesis; coverage looks solid for the happy path.

Sequence Diagram

sequenceDiagram
    participant Client
    participant Route as search_memory (route)
    participant Plan as raw_retrieval_plan (cache)
    participant Gather as asyncio.gather
    participant VS as vector_store
    participant Neo4j as neo4j
    participant Pipeline as pipeline.run (answer path)

    Client->>Route: POST /v1/memory/search
    Route->>Plan: raw_retrieval_plan(domains, answer)
    Plan-->>Route: normalized domain tuple

    Route->>Gather: gather(_timed tasks for all domains)
    par profile (thread)
        Gather->>VS: search_by_metadata (profile)
        VS-->>Gather: raw results
    and temporal (thread)
        Gather->>Neo4j: search_events_by_embedding
        Neo4j-->>Gather: raw events
    and summary/snippet/code (async)
        Gather->>VS: search_by_text (summary / code)
        Gather->>VS: _search_snippet (sandboxed ns)
        VS-->>Gather: raw results
    end
    Gather-->>Route: all domain results + latency

    opt "answer=true"
        Route->>Pipeline: pipeline.run(query, user_id, top_k)
        Note over Pipeline,VS: _fetch_profile_catalog checks cache,\nthen queries VS again (cache miss on first call)
        Pipeline->>VS: search_by_metadata (profile) duplicate fetch
        Pipeline-->>Route: answer + sources + confidence
    end

    Route-->>Client: SearchResponse(results, latency_ms, latency_stats, [answer])
Loading

Comments Outside Diff (1)

  1. src/api/routes/memory.py, line 1041-1049 (link)

    P1 Raw profile search bypasses the new cache on every request

    _search_profile calls pipeline.vector_store.search_by_metadata directly, completely bypassing _fetch_profile_catalog's TTL cache that this PR added. When answer=True and "profile" is in the requested domains, both the raw concurrent task and the subsequent pipeline.run call independently hit the vector store with the identical query (filters={"user_id": user_id, "domain": "profile"}, top_k=100). That's two redundant round-trips per request, every time — the cache never benefits the raw path. The fix would be to have _search_profile call pipeline._fetch_profile_catalog(user_id) and extract results from the cached return value instead of re-querying.

    Fix in Cursor Fix in Codex Fix in Claude Code

Fix All in Cursor Fix All in Codex Fix All in Claude Code

Reviews (1): Last reviewed commit: "Address cache review feedback" | Re-trigger Greptile

Comment on lines +565 to +573
def raw_retrieval_plan(self, domains: List[str], answer: bool = False) -> tuple[str, ...]:
"""Return a cached deterministic raw-search plan for the requested domains."""
ordered_allowed = ("profile", "temporal", "summary", "snippet", "code")
normalized = tuple(d for d in ordered_allowed if d in set(domains))
key = (normalized, answer)
with self._raw_retrieval_plan_cache_lock:
if key not in self._raw_retrieval_plan_cache:
self._raw_retrieval_plan_cache[key] = normalized
return self._raw_retrieval_plan_cache[key]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 answer is part of the cache key but is never used to compute the returned tuple — both (normalized, True) and (normalized, False) store and return the same normalized value. This silently doubles the number of entries in the plan cache for any domain set that is queried with both values, and the parameter looks meaningful when it isn't. Either use answer to actually vary the plan, or drop it from the key entirely.

Suggested change
def raw_retrieval_plan(self, domains: List[str], answer: bool = False) -> tuple[str, ...]:
"""Return a cached deterministic raw-search plan for the requested domains."""
ordered_allowed = ("profile", "temporal", "summary", "snippet", "code")
normalized = tuple(d for d in ordered_allowed if d in set(domains))
key = (normalized, answer)
with self._raw_retrieval_plan_cache_lock:
if key not in self._raw_retrieval_plan_cache:
self._raw_retrieval_plan_cache[key] = normalized
return self._raw_retrieval_plan_cache[key]
def raw_retrieval_plan(self, domains: List[str], answer: bool = False) -> tuple[str, ...]:
"""Return a cached deterministic raw-search plan for the requested domains."""
ordered_allowed = ("profile", "temporal", "summary", "snippet", "code")
normalized = tuple(d for d in ordered_allowed if d in set(domains))
with self._raw_retrieval_plan_cache_lock:
if normalized not in self._raw_retrieval_plan_cache:
self._raw_retrieval_plan_cache[normalized] = normalized
return self._raw_retrieval_plan_cache[normalized]

Fix in Cursor Fix in Codex Fix in Claude Code

Comment on lines +546 to +548
with self._profile_catalog_cache_lock:
self._prune_profile_catalog_cache(now)
self._profile_catalog_cache[user_id] = (now, catalog, results)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Missing double-check before cache write

When two concurrent calls for the same user_id both miss the cache in the first with block, they both release the lock and both execute the expensive search_by_metadata query. When the second with block is reached, there's no re-check of whether another thread already populated the entry. The second writer overwrites the first with a stale now timestamp (captured at the very start of the function). A simple check like if user_id not in self._profile_catalog_cache: before assigning would avoid the redundant overwrite and protect against cache stampede on concurrent first-hit requests for the same user.

Fix in Cursor Fix in Codex Fix in Claude Code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add low-latency raw search path separate from agentic answer synthesis

1 participant